This is an automated email from the ASF dual-hosted git repository.

emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new e5e0c0d9b Event Listeners (#922)
e5e0c0d9b is described below

commit e5e0c0d9b271550949abc3bb1cd6e0733204b16c
Author: Andrew Guterman <andrew.guterm...@gmail.com>
AuthorDate: Wed May 7 10:28:29 2025 -0700

    Event Listeners (#922)
    
    Implementation of event listeners discussed 
[here](https://lists.apache.org/thread/03yz5wolkvy8l7rbcwjnqdq1bl8p065v).
    
    I decided to keep this implementation generic and not take a dependency on 
Jakarta Events nor Vertx busses. It's easy to extend this, either within 
Polaris or in an external PolarisEventListener, and handle events however one 
wishes.
    
    Some high level notes:
    - PolarisEventListener is the main interface with all the event methods 
such as `onBeforeRequestRateLimited`
    - DefaultPolarisEventListener is an empty implementation which allows users 
to only partially implement event handlers
    - `polaris.events.type` is the config that lets you specify your event 
listener implementation
---
 .../src/main/resources/application.properties      |  2 +
 .../quarkus/config/ProductionReadinessChecks.java  | 12 +++
 .../service/quarkus/config/QuarkusProducers.java   |  9 ++
 .../QuarkusPolarisEventListenerConfiguration.java  | 32 +++++++
 .../quarkus/task/QuarkusTaskExecutorImpl.java      |  8 +-
 .../quarkus/admin/PolarisAuthzTestBase.java        | 13 ++-
 .../quarkus/catalog/GenericTableCatalogTest.java   |  4 +-
 .../catalog/IcebergCatalogHandlerAuthzTest.java    |  3 +-
 .../quarkus/catalog/IcebergCatalogTest.java        | 83 +++++++++++++-----
 .../quarkus/catalog/IcebergCatalogViewTest.java    | 57 ++++++++++++-
 .../service/quarkus/catalog/PolicyCatalogTest.java |  4 +-
 .../quarkus/ratelimiter/RateLimiterFilterTest.java | 13 ++-
 .../polaris/service/quarkus/test/TestData.java     | 36 ++++++++
 .../service/catalog/iceberg/IcebergCatalog.java    | 31 ++++++-
 .../context/PolarisCallContextCatalogFactory.java  |  9 +-
 .../service/events/AfterTableCommitedEvent.java    | 34 ++++++++
 .../service/events/AfterTableRefreshedEvent.java   | 28 ++++++
 .../service/events/AfterTaskAttemptedEvent.java    | 34 ++++++++
 .../service/events/AfterViewCommitedEvent.java     | 33 ++++++++
 .../service/events/AfterViewRefreshedEvent.java    | 28 ++++++
 .../events/BeforeRequestRateLimitedEvent.java      | 28 ++++++
 .../service/events/BeforeTableCommitedEvent.java   | 35 ++++++++
 .../service/events/BeforeTableRefreshedEvent.java  | 29 +++++++
 .../service/events/BeforeTaskAttemptedEvent.java   | 32 +++++++
 .../service/events/BeforeViewCommitedEvent.java    | 34 ++++++++
 .../service/events/BeforeViewRefreshedEvent.java   | 29 +++++++
 .../service/events/NoOpPolarisEventListener.java   | 27 ++++++
 .../polaris/service/events/PolarisEvent.java       | 25 ++++++
 .../service/events/PolarisEventListener.java       | 58 +++++++++++++
 .../service/events/TestPolarisEventListener.java   | 91 ++++++++++++++++++++
 .../service/ratelimiter/RateLimiterFilter.java     |  9 +-
 .../polaris/service/task/TaskExecutorImpl.java     | 95 ++++++++++++---------
 .../service/catalog/io/FileIOFactoryTest.java      |  3 +-
 .../polaris/service/task/TaskExecutorImplTest.java | 99 ++++++++++++++++++++++
 .../org/apache/polaris/service/TestServices.java   | 12 ++-
 35 files changed, 999 insertions(+), 80 deletions(-)

diff --git a/quarkus/defaults/src/main/resources/application.properties 
b/quarkus/defaults/src/main/resources/application.properties
index d9b82239e..0482f3de0 100644
--- a/quarkus/defaults/src/main/resources/application.properties
+++ b/quarkus/defaults/src/main/resources/application.properties
@@ -124,6 +124,8 @@ polaris.secrets-manager.type=in-memory
 
 polaris.file-io.type=default
 
+polaris.event-listener.type=no-op
+
 polaris.log.request-id-header-name=Polaris-Request-Id
 # polaris.log.mdc.aid=polaris
 # polaris.log.mdc.sid=polaris-service
diff --git 
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java
 
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java
index 5d9524f4c..2389537f1 100644
--- 
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java
+++ 
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java
@@ -36,6 +36,8 @@ import org.apache.polaris.service.auth.AuthenticationType;
 import org.apache.polaris.service.context.DefaultRealmContextResolver;
 import org.apache.polaris.service.context.RealmContextResolver;
 import org.apache.polaris.service.context.TestRealmContextResolver;
+import org.apache.polaris.service.events.PolarisEventListener;
+import org.apache.polaris.service.events.TestPolarisEventListener;
 import 
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
 import 
org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration;
 import org.eclipse.microprofile.config.Config;
@@ -161,6 +163,16 @@ public class ProductionReadinessChecks {
     return ProductionReadinessCheck.OK;
   }
 
+  @Produces
+  public ProductionReadinessCheck checkPolarisEventListener(
+      PolarisEventListener polarisEventListener) {
+    if (polarisEventListener instanceof TestPolarisEventListener) {
+      return ProductionReadinessCheck.of(
+          Error.of("TestPolarisEventListener is intended for tests only.", 
"polaris.events.type"));
+    }
+    return ProductionReadinessCheck.OK;
+  }
+
   private static String authRealmSegment(String realm) {
     return realm.equals(QuarkusAuthenticationConfiguration.DEFAULT_REALM_KEY) 
? "" : realm + ".";
   }
diff --git 
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java
 
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java
index 122921e82..89bf2c6fb 100644
--- 
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java
+++ 
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java
@@ -62,11 +62,13 @@ import 
org.apache.polaris.service.config.RealmEntityManagerFactory;
 import org.apache.polaris.service.context.RealmContextConfiguration;
 import org.apache.polaris.service.context.RealmContextFilter;
 import org.apache.polaris.service.context.RealmContextResolver;
+import org.apache.polaris.service.events.PolarisEventListener;
 import 
org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration;
 import 
org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationRealmConfiguration;
 import 
org.apache.polaris.service.quarkus.auth.external.tenant.OidcTenantResolver;
 import 
org.apache.polaris.service.quarkus.catalog.io.QuarkusFileIOConfiguration;
 import 
org.apache.polaris.service.quarkus.context.QuarkusRealmContextConfiguration;
+import 
org.apache.polaris.service.quarkus.events.QuarkusPolarisEventListenerConfiguration;
 import 
org.apache.polaris.service.quarkus.persistence.QuarkusPersistenceConfiguration;
 import 
org.apache.polaris.service.quarkus.ratelimiter.QuarkusRateLimiterFilterConfiguration;
 import 
org.apache.polaris.service.quarkus.ratelimiter.QuarkusTokenBucketConfiguration;
@@ -155,6 +157,13 @@ public class QuarkusProducers {
     return fileIOFactories.select(Identifier.Literal.of(config.type())).get();
   }
 
+  @Produces
+  public PolarisEventListener polarisEventListener(
+      QuarkusPolarisEventListenerConfiguration config,
+      @Any Instance<PolarisEventListener> polarisEventListeners) {
+    return 
polarisEventListeners.select(Identifier.Literal.of(config.type())).get();
+  }
+
   @Produces
   public MetaStoreManagerFactory metaStoreManagerFactory(
       QuarkusPersistenceConfiguration config,
diff --git 
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java
 
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java
new file mode 100644
index 000000000..8921c726c
--- /dev/null
+++ 
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.quarkus.events;
+
+import io.quarkus.runtime.annotations.StaticInitSafe;
+import io.smallrye.config.ConfigMapping;
+
+@StaticInitSafe
+@ConfigMapping(prefix = "polaris.event-listener")
+public interface QuarkusPolarisEventListenerConfiguration {
+  /**
+   * The type of the event listener to use. Must be a registered {@link
+   * org.apache.polaris.service.events.PolarisEventListener} identifier.
+   */
+  String type();
+}
diff --git 
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java
 
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java
index 8f38de648..3e16edb5a 100644
--- 
a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java
+++ 
b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java
@@ -29,6 +29,7 @@ import jakarta.inject.Inject;
 import java.util.concurrent.ExecutorService;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.PolarisEventListener;
 import org.apache.polaris.service.quarkus.tracing.QuarkusTracingFilter;
 import org.apache.polaris.service.task.TaskExecutorImpl;
 import org.apache.polaris.service.task.TaskFileIOSupplier;
@@ -39,7 +40,7 @@ public class QuarkusTaskExecutorImpl extends TaskExecutorImpl 
{
   private final Tracer tracer;
 
   public QuarkusTaskExecutorImpl() {
-    this(null, null, null, null);
+    this(null, null, null, null, null);
   }
 
   @Inject
@@ -47,8 +48,9 @@ public class QuarkusTaskExecutorImpl extends TaskExecutorImpl 
{
       @Identifier("task-executor") ExecutorService executorService,
       MetaStoreManagerFactory metaStoreManagerFactory,
       TaskFileIOSupplier fileIOSupplier,
-      Tracer tracer) {
-    super(executorService, metaStoreManagerFactory, fileIOSupplier);
+      Tracer tracer,
+      PolarisEventListener polarisEventListener) {
+    super(executorService, metaStoreManagerFactory, fileIOSupplier, 
polarisEventListener);
     this.tracer = tracer;
   }
 
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
index 6f3498d31..5761812f4 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
@@ -88,6 +88,7 @@ import 
org.apache.polaris.service.config.DefaultConfigurationStore;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
 import org.apache.polaris.service.context.CallContextCatalogFactory;
 import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
+import org.apache.polaris.service.events.PolarisEventListener;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
 import org.apache.polaris.service.task.TaskExecutor;
 import org.apache.polaris.service.types.PolicyIdentifier;
@@ -188,6 +189,7 @@ public abstract class PolarisAuthzTestBase {
   @Inject protected PolarisDiagnostics diagServices;
   @Inject protected Clock clock;
   @Inject protected FileIOFactory fileIOFactory;
+  @Inject protected PolarisEventListener polarisEventListener;
 
   protected IcebergCatalog baseCatalog;
   protected GenericTableCatalog genericTableCatalog;
@@ -469,7 +471,8 @@ public abstract class PolarisAuthzTestBase {
             passthroughView,
             securityContext,
             Mockito.mock(),
-            fileIOFactory);
+            fileIOFactory,
+            polarisEventListener);
     this.baseCatalog.initialize(
         CATALOG_NAME,
         ImmutableMap.of(
@@ -485,7 +488,7 @@ public abstract class PolarisAuthzTestBase {
       extends PolarisCallContextCatalogFactory {
 
     public TestPolarisCallContextCatalogFactory() {
-      super(null, null, null, null, null);
+      super(null, null, null, null, null, null);
     }
 
     @Inject
@@ -494,13 +497,15 @@ public abstract class PolarisAuthzTestBase {
         MetaStoreManagerFactory metaStoreManagerFactory,
         UserSecretsManagerFactory userSecretsManagerFactory,
         TaskExecutor taskExecutor,
-        FileIOFactory fileIOFactory) {
+        FileIOFactory fileIOFactory,
+        PolarisEventListener polarisEventListener) {
       super(
           entityManagerFactory,
           metaStoreManagerFactory,
           userSecretsManagerFactory,
           taskExecutor,
-          fileIOFactory);
+          fileIOFactory,
+          polarisEventListener);
     }
 
     @Override
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java
index 96b1010b4..0b6af6e8f 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java
@@ -81,6 +81,7 @@ import 
org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
 import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.events.NoOpPolarisEventListener;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
 import org.apache.polaris.service.task.TaskExecutor;
 import org.assertj.core.api.Assertions;
@@ -264,7 +265,8 @@ public class GenericTableCatalogTest {
             passthroughView,
             securityContext,
             taskExecutor,
-            fileIOFactory);
+            fileIOFactory,
+            new NoOpPolarisEventListener());
     this.icebergCatalog.initialize(
         CATALOG_NAME,
         ImmutableMap.of(
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
index 11a938f31..e74b7d641 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java
@@ -1808,7 +1808,8 @@ public class IcebergCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             userSecretsManagerFactory,
             Mockito.mock(),
             new DefaultFileIOFactory(
-                realmEntityManagerFactory, managerFactory, new 
PolarisConfigurationStore() {})) {
+                realmEntityManagerFactory, managerFactory, new 
PolarisConfigurationStore() {}),
+            polarisEventListener) {
           @Override
           public Catalog createCallContextCatalog(
               CallContext context,
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
index 05ad0b54b..d81edd1d4 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java
@@ -19,7 +19,6 @@
 package org.apache.polaris.service.quarkus.catalog;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doReturn;
@@ -116,8 +115,15 @@ import 
org.apache.polaris.service.catalog.io.ExceptionMappingFileIO;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.PolarisEventListener;
+import org.apache.polaris.service.events.TestPolarisEventListener;
 import org.apache.polaris.service.exception.FakeAzureHttpResponse;
 import org.apache.polaris.service.exception.IcebergExceptionMapper;
+import org.apache.polaris.service.quarkus.test.TestData;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
 import org.apache.polaris.service.task.TableCleanupTaskHandler;
 import org.apache.polaris.service.task.TaskExecutor;
@@ -164,17 +170,14 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
           
"polaris.features.defaults.\"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST\"",
           "true",
           "polaris.features.defaults.\"SUPPORTED_CATALOG_STORAGE_TYPES\"",
-          "[\"FILE\"]");
+          "[\"FILE\"]",
+          "polaris.event-listener.type",
+          "test");
     }
   }
 
-  protected static final Namespace NS = Namespace.of("newdb");
-  protected static final TableIdentifier TABLE = TableIdentifier.of(NS, 
"table");
-  protected static final Schema SCHEMA =
-      new Schema(
-          required(3, "id", Types.IntegerType.get(), "unique ID 🤪"),
-          required(4, "data", Types.StringType.get()));
   private static final String VIEW_QUERY = "select * from ns1.layer1_table";
+
   public static final String CATALOG_NAME = "polaris-catalog";
   public static final String TEST_ACCESS_KEY = "test_access_key";
   public static final String SECRET_ACCESS_KEY = "secret_access_key";
@@ -198,6 +201,7 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
   @Inject PolarisStorageIntegrationProvider storageIntegrationProvider;
   @Inject UserSecretsManagerFactory userSecretsManagerFactory;
   @Inject PolarisDiagnostics diagServices;
+  @Inject PolarisEventListener polarisEventListener;
 
   private IcebergCatalog catalog;
   private CallContext callContext;
@@ -210,6 +214,7 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
   private FileIOFactory fileIOFactory;
   private PolarisEntity catalogEntity;
   private SecurityContext securityContext;
+  private TestPolarisEventListener testPolarisEventListener;
 
   @BeforeAll
   public static void setUpMocks() {
@@ -319,6 +324,7 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
         .thenReturn((PolarisStorageIntegration) storageIntegration);
 
     this.catalog = initCatalog("my-catalog", ImmutableMap.of());
+    testPolarisEventListener = (TestPolarisEventListener) polarisEventListener;
   }
 
   @AfterEach
@@ -354,7 +360,8 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
             passthroughView,
             securityContext,
             taskExecutor,
-            fileIOFactory);
+            fileIOFactory,
+            polarisEventListener);
     ImmutableMap.Builder<String, String> propertiesBuilder =
         ImmutableMap.<String, String>builder()
             .put(CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO")
@@ -649,7 +656,8 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
             passthroughView,
             securityContext,
             Mockito.mock(TaskExecutor.class),
-            fileIOFactory);
+            fileIOFactory,
+            polarisEventListener);
     catalog.initialize(
         CATALOG_NAME,
         ImmutableMap.of(
@@ -984,7 +992,8 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
             passthroughView,
             securityContext,
             taskExecutor,
-            fileIOFactory);
+            fileIOFactory,
+            polarisEventListener);
     catalog.initialize(
         catalogWithoutStorage,
         ImmutableMap.of(
@@ -1050,7 +1059,8 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
             passthroughView,
             securityContext,
             taskExecutor,
-            fileIOFactory);
+            fileIOFactory,
+            polarisEventListener);
     catalog.initialize(
         catalogName,
         ImmutableMap.of(
@@ -1596,7 +1606,8 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
             passthroughView,
             securityContext,
             Mockito.mock(),
-            fileIOFactory);
+            fileIOFactory,
+            polarisEventListener);
     noPurgeCatalog.initialize(
         noPurgeCatalogName,
         ImmutableMap.of(
@@ -1704,7 +1715,8 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
             passthroughView,
             securityContext,
             Mockito.mock(),
-            measured);
+            measured,
+            polarisEventListener);
     catalog.initialize(
         CATALOG_NAME,
         ImmutableMap.of(
@@ -1714,8 +1726,8 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
         .as("Nothing was created yet")
         .isEqualTo(0);
 
-    catalog.createNamespace(NS);
-    Table table = catalog.buildTable(TABLE, SCHEMA).create();
+    catalog.createNamespace(TestData.NAMESPACE);
+    Table table = catalog.buildTable(TestData.TABLE, TestData.SCHEMA).create();
 
     // Asserting greaterThan 0 is sufficient for validating that the wrapper 
works without making
     // assumptions about the
@@ -1727,7 +1739,9 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
         .as("A table was read and written, but a trip to storage was made")
         .isEqualTo(0);
 
-    Assertions.assertThat(catalog.dropTable(TABLE)).as("Table deletion should 
succeed").isTrue();
+    Assertions.assertThat(catalog.dropTable(TestData.TABLE))
+        .as("Table deletion should succeed")
+        .isTrue();
     TaskEntity taskEntity =
         TaskEntity.of(
             metaStoreManager
@@ -1801,7 +1815,8 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
             passthroughView,
             securityContext,
             Mockito.mock(TaskExecutor.class),
-            fileIOFactory);
+            fileIOFactory,
+            polarisEventListener);
     catalog.initialize(
         CATALOG_NAME,
         ImmutableMap.of(
@@ -1849,7 +1864,8 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
             passthroughView,
             securityContext,
             Mockito.mock(TaskExecutor.class),
-            fileIOFactory);
+            fileIOFactory,
+            polarisEventListener);
     catalog.initialize(
         CATALOG_NAME,
         ImmutableMap.of(
@@ -1924,6 +1940,35 @@ public abstract class IcebergCatalogTest extends 
CatalogTests<IcebergCatalog> {
     }
   }
 
+  @Test
+  public void testEventsAreEmitted() {
+    IcebergCatalog catalog = catalog();
+    catalog.createNamespace(TestData.NAMESPACE);
+    Table table = catalog.buildTable(TestData.TABLE, TestData.SCHEMA).create();
+
+    String key = "foo";
+    String valOld = "bar1";
+    String valNew = "bar2";
+    table.updateProperties().set(key, valOld).commit();
+    table.updateProperties().set(key, valNew).commit();
+
+    var beforeRefreshEvent = 
testPolarisEventListener.getLatest(BeforeTableRefreshedEvent.class);
+    
Assertions.assertThat(beforeRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE);
+
+    var afterRefreshEvent = 
testPolarisEventListener.getLatest(AfterTableRefreshedEvent.class);
+    
Assertions.assertThat(afterRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE);
+
+    var beforeTableEvent = 
testPolarisEventListener.getLatest(BeforeTableCommitedEvent.class);
+    
Assertions.assertThat(beforeTableEvent.identifier()).isEqualTo(TestData.TABLE);
+    
Assertions.assertThat(beforeTableEvent.base().properties().get(key)).isEqualTo(valOld);
+    
Assertions.assertThat(beforeTableEvent.metadata().properties().get(key)).isEqualTo(valNew);
+
+    var afterTableEvent = 
testPolarisEventListener.getLatest(AfterTableCommitedEvent.class);
+    
Assertions.assertThat(afterTableEvent.identifier()).isEqualTo(TestData.TABLE);
+    
Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld);
+    
Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew);
+  }
+
   private static InMemoryFileIO getInMemoryIo(IcebergCatalog catalog) {
     return (InMemoryFileIO) ((ExceptionMappingFileIO) 
catalog.getIo()).getInnerIo();
   }
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java
index 25089d5c4..c66e88b37 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewCatalogTests;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
@@ -67,12 +68,21 @@ import 
org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
 import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+import org.apache.polaris.service.events.PolarisEventListener;
+import org.apache.polaris.service.events.TestPolarisEventListener;
+import org.apache.polaris.service.quarkus.test.TestData;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
+import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Assumptions;
 import org.assertj.core.configuration.PreferredAssumptionException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.io.TempDir;
 import org.mockito.Mockito;
@@ -98,7 +108,9 @@ public class IcebergCatalogViewTest extends 
ViewCatalogTests<IcebergCatalog> {
           
"polaris.features.defaults.\"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST\"",
           "true",
           "polaris.features.defaults.\"SUPPORTED_CATALOG_STORAGE_TYPES\"",
-          "[\"FILE\"]");
+          "[\"FILE\"]",
+          "polaris.event-listener.type",
+          "test");
     }
   }
 
@@ -116,6 +128,7 @@ public class IcebergCatalogViewTest extends 
ViewCatalogTests<IcebergCatalog> {
   @Inject UserSecretsManagerFactory userSecretsManagerFactory;
   @Inject PolarisConfigurationStore configurationStore;
   @Inject PolarisDiagnostics diagServices;
+  @Inject PolarisEventListener polarisEventListener;
 
   private IcebergCatalog catalog;
 
@@ -124,6 +137,8 @@ public class IcebergCatalogViewTest extends 
ViewCatalogTests<IcebergCatalog> {
   private UserSecretsManager userSecretsManager;
   private PolarisCallContext polarisContext;
 
+  private TestPolarisEventListener testPolarisEventListener;
+
   @BeforeAll
   public static void setUpMocks() {
     PolarisStorageIntegrationProviderImpl mock =
@@ -212,6 +227,8 @@ public class IcebergCatalogViewTest extends 
ViewCatalogTests<IcebergCatalog> {
     FileIOFactory fileIOFactory =
         new DefaultFileIOFactory(
             new RealmEntityManagerFactory(managerFactory), managerFactory, 
configurationStore);
+
+    testPolarisEventListener = (TestPolarisEventListener) polarisEventListener;
     this.catalog =
         new IcebergCatalog(
             entityManager,
@@ -220,7 +237,8 @@ public class IcebergCatalogViewTest extends 
ViewCatalogTests<IcebergCatalog> {
             passthroughView,
             securityContext,
             Mockito.mock(),
-            fileIOFactory);
+            fileIOFactory,
+            polarisEventListener);
     Map<String, String> properties =
         ImmutableMap.<String, String>builder()
             .put(CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO")
@@ -249,4 +267,39 @@ public class IcebergCatalogViewTest extends 
ViewCatalogTests<IcebergCatalog> {
   protected boolean requiresNamespaceCreate() {
     return true;
   }
+
+  @Test
+  public void testEventsAreEmitted() {
+    IcebergCatalog catalog = catalog();
+    catalog.createNamespace(TestData.NAMESPACE);
+    View view =
+        catalog
+            .buildView(TestData.TABLE)
+            .withDefaultNamespace(TestData.NAMESPACE)
+            .withSchema(TestData.SCHEMA)
+            .withQuery("a", "b")
+            .create();
+
+    String key = "foo";
+    String valOld = "bar1";
+    String valNew = "bar2";
+    view.updateProperties().set(key, valOld).commit();
+    view.updateProperties().set(key, valNew).commit();
+
+    var beforeRefreshEvent = 
testPolarisEventListener.getLatest(BeforeViewRefreshedEvent.class);
+    
Assertions.assertThat(beforeRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE);
+
+    var afterRefreshEvent = 
testPolarisEventListener.getLatest(AfterViewRefreshedEvent.class);
+    
Assertions.assertThat(afterRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE);
+
+    var beforeCommitEvent = 
testPolarisEventListener.getLatest(BeforeViewCommitedEvent.class);
+    
Assertions.assertThat(beforeCommitEvent.identifier()).isEqualTo(TestData.TABLE);
+    
Assertions.assertThat(beforeCommitEvent.base().properties().get(key)).isEqualTo(valOld);
+    
Assertions.assertThat(beforeCommitEvent.metadata().properties().get(key)).isEqualTo(valNew);
+
+    var afterCommitEvent = 
testPolarisEventListener.getLatest(AfterViewCommitedEvent.class);
+    
Assertions.assertThat(afterCommitEvent.identifier()).isEqualTo(TestData.TABLE);
+    
Assertions.assertThat(afterCommitEvent.base().properties().get(key)).isEqualTo(valOld);
+    
Assertions.assertThat(afterCommitEvent.metadata().properties().get(key)).isEqualTo(valNew);
+  }
 }
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
index a5bf98ba9..68e2c35e0 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
@@ -94,6 +94,7 @@ import 
org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.catalog.policy.PolicyCatalog;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.events.NoOpPolarisEventListener;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
 import org.apache.polaris.service.task.TaskExecutor;
 import org.apache.polaris.service.types.ApplicablePolicy;
@@ -287,7 +288,8 @@ public class PolicyCatalogTest {
             passthroughView,
             securityContext,
             taskExecutor,
-            fileIOFactory);
+            fileIOFactory,
+            new NoOpPolarisEventListener());
     this.icebergCatalog.initialize(
         CATALOG_NAME,
         ImmutableMap.of(
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java
index a5f66ec5b..48aa45001 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java
@@ -31,6 +31,9 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Consumer;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.PolarisEventListener;
+import org.apache.polaris.service.events.TestPolarisEventListener;
 import 
org.apache.polaris.service.quarkus.ratelimiter.RateLimiterFilterTest.Profile;
 import org.apache.polaris.service.quarkus.test.PolarisIntegrationTestFixture;
 import org.apache.polaris.service.quarkus.test.PolarisIntegrationTestHelper;
@@ -81,7 +84,9 @@ public class RateLimiterFilterTest {
           "polaris.authentication.token-broker.type",
           "symmetric-key",
           "polaris.authentication.token-broker.symmetric-key.secret",
-          "secret");
+          "secret",
+          "polaris.event-listener.type",
+          "test");
     }
   }
 
@@ -90,6 +95,7 @@ public class RateLimiterFilterTest {
 
   @Inject PolarisIntegrationTestHelper helper;
   @Inject MeterRegistry meterRegistry;
+  @Inject PolarisEventListener polarisEventListener;
 
   private TestEnvironment testEnv;
   private PolarisIntegrationTestFixture fixture;
@@ -145,6 +151,11 @@ public class RateLimiterFilterTest {
     }
     requestAsserter.accept(Status.TOO_MANY_REQUESTS);
 
+    BeforeRequestRateLimitedEvent event =
+        ((TestPolarisEventListener) polarisEventListener)
+            .getLatest(BeforeRequestRateLimitedEvent.class);
+    assertThat(event.method()).isEqualTo("GET");
+
     // Examples of expected metrics:
     // 
http_server_requests_seconds_count{application="Polaris",environment="prod",method="GET",outcome="CLIENT_ERROR",realm_id="org_apache_polaris_service_ratelimiter_RateLimiterFilterTest",status="429",uri="/api/management/v1/principal-roles"}
 1.0
     // 
polaris_principal_roles_listPrincipalRoles_seconds_count{application="Polaris",class="org.apache.polaris.service.admin.api.PolarisPrincipalRolesApi",environment="prod",exception="none",method="listPrincipalRoles"}
 50.0
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/TestData.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/TestData.java
new file mode 100644
index 000000000..8c061f201
--- /dev/null
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/TestData.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.quarkus.test;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+
+/** Contains test data that can be reused across tests */
+public class TestData {
+  public static final Namespace NAMESPACE = Namespace.of("newdb");
+  public static final TableIdentifier TABLE = TableIdentifier.of(NAMESPACE, 
"table");
+  public static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get(), "unique ID 🤪"),
+          required(4, "data", Types.StringType.get()));
+}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index afa7d1477..c66def885 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -123,6 +123,15 @@ import org.apache.polaris.core.storage.StorageLocation;
 import org.apache.polaris.service.catalog.SupportsNotifications;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.catalog.io.FileIOUtil;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+import org.apache.polaris.service.events.PolarisEventListener;
 import org.apache.polaris.service.task.TaskExecutor;
 import org.apache.polaris.service.types.NotificationRequest;
 import org.apache.polaris.service.types.NotificationType;
@@ -171,6 +180,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
   private final CatalogEntity catalogEntity;
   private final TaskExecutor taskExecutor;
   private final SecurityContext securityContext;
+  private final PolarisEventListener polarisEventListener;
+
   private String ioImplClassName;
   private FileIO catalogFileIO;
   private final String catalogName;
@@ -197,7 +208,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
       PolarisResolutionManifestCatalogView resolvedEntityView,
       SecurityContext securityContext,
       TaskExecutor taskExecutor,
-      FileIOFactory fileIOFactory) {
+      FileIOFactory fileIOFactory,
+      PolarisEventListener polarisEventListener) {
     this.entityManager = entityManager;
     this.callContext = callContext;
     this.resolvedEntityView = resolvedEntityView;
@@ -209,6 +221,7 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
     this.catalogName = catalogEntity.getName();
     this.fileIOFactory = fileIOFactory;
     this.metaStoreManager = metaStoreManager;
+    this.polarisEventListener = polarisEventListener;
   }
 
   @Override
@@ -1319,6 +1332,7 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       if (latestLocation == null) {
         disableRefresh();
       } else {
+        polarisEventListener.onBeforeTableRefreshed(new 
BeforeTableRefreshedEvent(tableIdentifier));
         refreshFromMetadataLocation(
             latestLocation,
             SHOULD_RETRY_REFRESH_PREDICATE,
@@ -1338,10 +1352,14 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
                       Set.of(PolarisStorageActions.READ));
               return TableMetadataParser.read(fileIO, metadataLocation);
             });
+        polarisEventListener.onAfterTableRefreshed(new 
AfterTableRefreshedEvent(tableIdentifier));
       }
     }
 
     public void doCommit(TableMetadata base, TableMetadata metadata) {
+      polarisEventListener.onBeforeTableCommited(
+          new BeforeTableCommitedEvent(tableIdentifier, base, metadata));
+
       LOGGER.debug(
           "doCommit for table {} with base {}, metadata {}", tableIdentifier, 
base, metadata);
       // TODO: Maybe avoid writing metadata if there's definitely a 
transaction conflict
@@ -1494,6 +1512,9 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       } else {
         updateTableLike(tableIdentifier, entity);
       }
+
+      polarisEventListener.onAfterTableCommited(
+          new AfterTableCommitedEvent(tableIdentifier, base, metadata));
     }
 
     @Override
@@ -1684,6 +1705,7 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       if (latestLocation == null) {
         disableRefresh();
       } else {
+        polarisEventListener.onBeforeViewRefreshed(new 
BeforeViewRefreshedEvent(identifier));
         refreshFromMetadataLocation(
             latestLocation,
             SHOULD_RETRY_REFRESH_PREDICATE,
@@ -1705,10 +1727,14 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
 
               return 
ViewMetadataParser.read(fileIO.newInputFile(metadataLocation));
             });
+        polarisEventListener.onAfterViewRefreshed(new 
AfterViewRefreshedEvent(identifier));
       }
     }
 
     public void doCommit(ViewMetadata base, ViewMetadata metadata) {
+      polarisEventListener.onBeforeViewCommited(
+          new BeforeViewCommitedEvent(identifier, base, metadata));
+
       // TODO: Maybe avoid writing metadata if there's definitely a 
transaction conflict
       LOGGER.debug("doCommit for view {} with base {}, metadata {}", 
identifier, base, metadata);
       if (null == base && !namespaceExists(identifier.namespace())) {
@@ -1802,6 +1828,9 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       } else {
         updateTableLike(identifier, entity);
       }
+
+      polarisEventListener.onAfterViewCommited(
+          new AfterViewCommitedEvent(identifier, base, metadata));
     }
 
     protected String writeNewMetadataIfRequired(ViewMetadata metadata) {
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
 
b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
index 94fe19760..b1d28ce89 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
@@ -38,6 +38,7 @@ import 
org.apache.polaris.core.secrets.UserSecretsManagerFactory;
 import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import org.apache.polaris.service.events.PolarisEventListener;
 import org.apache.polaris.service.task.TaskExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +56,7 @@ public class PolarisCallContextCatalogFactory implements 
CallContextCatalogFacto
   private final FileIOFactory fileIOFactory;
   private final MetaStoreManagerFactory metaStoreManagerFactory;
   private final UserSecretsManagerFactory userSecretsManagerFactory;
+  private final PolarisEventListener polarisEventListener;
 
   @Inject
   public PolarisCallContextCatalogFactory(
@@ -62,12 +64,14 @@ public class PolarisCallContextCatalogFactory implements 
CallContextCatalogFacto
       MetaStoreManagerFactory metaStoreManagerFactory,
       UserSecretsManagerFactory userSecretsManagerFactory,
       TaskExecutor taskExecutor,
-      FileIOFactory fileIOFactory) {
+      FileIOFactory fileIOFactory,
+      PolarisEventListener polarisEventListener) {
     this.entityManagerFactory = entityManagerFactory;
     this.metaStoreManagerFactory = metaStoreManagerFactory;
     this.userSecretsManagerFactory = userSecretsManagerFactory;
     this.taskExecutor = taskExecutor;
     this.fileIOFactory = fileIOFactory;
+    this.polarisEventListener = polarisEventListener;
   }
 
   @Override
@@ -95,7 +99,8 @@ public class PolarisCallContextCatalogFactory implements 
CallContextCatalogFacto
             resolvedManifest,
             securityContext,
             taskExecutor,
-            fileIOFactory);
+            fileIOFactory,
+            polarisEventListener);
 
     
context.contextVariables().put(CallContext.REQUEST_PATH_CATALOG_INSTANCE_KEY, 
catalogInstance);
 
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java
new file mode 100644
index 000000000..c952997df
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Emitted after Polaris performs a commit to a table. This is not emitted if 
there's an exception
+ * while committing.
+ *
+ * @param identifier The identifier.
+ * @param base The old metadata.
+ * @param metadata The new metadata.
+ */
+public record AfterTableCommitedEvent(
+    TableIdentifier identifier, TableMetadata base, TableMetadata metadata)
+    implements PolarisEvent {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java
new file mode 100644
index 000000000..be38a8baa
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Emitted after Polaris refreshes its known version of a table's metadata by 
fetching the latest.
+ *
+ * @param tableIdentifier The identifier of the table that was refreshed.
+ */
+public record AfterTableRefreshedEvent(TableIdentifier tableIdentifier) 
implements PolarisEvent {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java
new file mode 100644
index 000000000..638ba84fb
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.polaris.core.context.CallContext;
+
+/**
+ * Emitted after an attempt of an async task, such as manifest file cleanup, 
finishes.
+ *
+ * @param taskEntityId The ID of the TaskEntity.
+ * @param callContext The CallContext the task is being executed under.
+ * @param attempt The attempt number. Each retry of the task will have its own 
attempt number. The
+ *     initial (non-retried) attempt starts counting from 1.
+ * @param success Whether or not the attempt succeeded.
+ */
+public record AfterTaskAttemptedEvent(
+    long taskEntityId, CallContext callContext, int attempt, boolean success)
+    implements PolarisEvent {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java
new file mode 100644
index 000000000..eb2ca2414
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.view.ViewMetadata;
+
+/**
+ * Emitted after Polaris performs a commit to a view. This is not emitted if 
there's an exception
+ * while committing.
+ *
+ * @param identifier The identifier.
+ * @param base The old metadata.
+ * @param metadata The new metadata.
+ */
+public record AfterViewCommitedEvent(
+    TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) 
implements PolarisEvent {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java
new file mode 100644
index 000000000..249220ddd
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Emitted after Polaris refreshes its known version of a view's metadata by 
fetching the latest.
+ *
+ * @param viewIdentifier The identifier of the view that was refreshed.
+ */
+public record AfterViewRefreshedEvent(TableIdentifier viewIdentifier) 
implements PolarisEvent {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java
new file mode 100644
index 000000000..1d9780ebe
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+/**
+ * Emitted before the RateLimiterFilter rejects a request due to exceeding the 
rate limit.
+ *
+ * @param method The request's HTTP method
+ * @param absolutePath The request's absolute path
+ */
+public record BeforeRequestRateLimitedEvent(String method, String absolutePath)
+    implements PolarisEvent {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java
new file mode 100644
index 000000000..2bcc49ab6
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Emitted when Polaris intends to perform a commit to a table. There is no 
guarantee on the order
+ * of this event relative to the validation checks we've performed, which 
means the commit may still
+ * fail Polaris-side validation checks.
+ *
+ * @param identifier The identifier.
+ * @param base The old metadata.
+ * @param metadata The new metadata.
+ */
+public record BeforeTableCommitedEvent(
+    TableIdentifier identifier, TableMetadata base, TableMetadata metadata)
+    implements PolarisEvent {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java
new file mode 100644
index 000000000..f319298f5
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Emitted when Polaris intends to refresh its known version of a table's 
metadata by fetching the
+ * latest.
+ *
+ * @param tableIdentifier The identifier of the table being refreshed.
+ */
+public record BeforeTableRefreshedEvent(TableIdentifier tableIdentifier) 
implements PolarisEvent {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java
new file mode 100644
index 000000000..a7fa7231e
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.polaris.core.context.CallContext;
+
+/**
+ * Emitted before an attempt of an async task, such as manifest file cleanup, 
begins.
+ *
+ * @param taskEntityId The ID of the TaskEntity
+ * @param callContext The CallContext the task is being executed under.
+ * @param attempt The attempt number. Each retry of the task will have its own 
attempt number. The
+ *     initial (non-retried) attempt starts counting from 1.
+ */
+public record BeforeTaskAttemptedEvent(long taskEntityId, CallContext 
callContext, int attempt)
+    implements PolarisEvent {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java
new file mode 100644
index 000000000..16e460d80
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.view.ViewMetadata;
+
+/**
+ * Emitted when Polaris intends to perform a commit to a view. There is no 
guarantee on the order of
+ * this event relative to the validation checks we've performed, which means 
the commit may still
+ * fail Polaris-side validation checks.
+ *
+ * @param identifier The identifier.
+ * @param base The old metadata.
+ * @param metadata The new metadata.
+ */
+public record BeforeViewCommitedEvent(
+    TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) 
implements PolarisEvent {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java
new file mode 100644
index 000000000..6f58d2ca2
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Emitted when Polaris intends to refresh its known version of a view's 
metadata by fetching the
+ * latest.
+ *
+ * @param viewIdentifier The identifier of the view being refreshed.
+ */
+public record BeforeViewRefreshedEvent(TableIdentifier viewIdentifier) 
implements PolarisEvent {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java
new file mode 100644
index 000000000..f31fbcef5
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+
+/** Event listener that does nothing. */
+@ApplicationScoped
+@Identifier("no-op")
+public class NoOpPolarisEventListener extends PolarisEventListener {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/PolarisEvent.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEvent.java
new file mode 100644
index 000000000..4922c02f4
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEvent.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+/**
+ * Represents an event emitted by Polaris. Currently there's no common data 
across events so this is
+ * just a marker interface. *
+ */
+public interface PolarisEvent {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java
new file mode 100644
index 000000000..485766bb2
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+/**
+ * Represents an event listener that can respond to notable moments during 
Polaris's execution.
+ * Event details are documented under the event objects themselves.
+ */
+public abstract class PolarisEventListener {
+  /** {@link BeforeRequestRateLimitedEvent} */
+  public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) 
{}
+
+  /** {@link BeforeTableCommitedEvent} */
+  public void onBeforeTableCommited(BeforeTableCommitedEvent event) {}
+
+  /** {@link AfterTableCommitedEvent} */
+  public void onAfterTableCommited(AfterTableCommitedEvent event) {}
+
+  /** {@link BeforeViewCommitedEvent} */
+  public void onBeforeViewCommited(BeforeViewCommitedEvent event) {}
+
+  /** {@link AfterViewCommitedEvent} */
+  public void onAfterViewCommited(AfterViewCommitedEvent event) {}
+
+  /** {@link BeforeTableRefreshedEvent} */
+  public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {}
+
+  /** {@link AfterTableRefreshedEvent} */
+  public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {}
+
+  /** {@link BeforeViewRefreshedEvent} */
+  public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {}
+
+  /** {@link AfterViewRefreshedEvent} */
+  public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {}
+
+  /** {@link BeforeTaskAttemptedEvent} */
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {}
+
+  /** {@link AfterTaskAttemptedEvent} */
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {}
+}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java
 
b/service/common/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java
new file mode 100644
index 000000000..668edc7fa
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import com.google.common.collect.Streams;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Event listener that stores all emitted events forever. Not recommended for 
use in production. */
+@ApplicationScoped
+@Identifier("test")
+public class TestPolarisEventListener extends PolarisEventListener {
+  private final List<PolarisEvent> history = new ArrayList<>();
+
+  public <T> T getLatest(Class<T> type) {
+    return (T) 
Streams.findLast(history.stream().filter(type::isInstance)).orElseThrow();
+  }
+
+  @Override
+  public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {
+    history.add(event);
+  }
+
+  @Override
+  public void onBeforeTableCommited(BeforeTableCommitedEvent event) {
+    history.add(event);
+  }
+
+  @Override
+  public void onAfterTableCommited(AfterTableCommitedEvent event) {
+    history.add(event);
+  }
+
+  @Override
+  public void onBeforeViewCommited(BeforeViewCommitedEvent event) {
+    history.add(event);
+  }
+
+  @Override
+  public void onAfterViewCommited(AfterViewCommitedEvent event) {
+    history.add(event);
+  }
+
+  @Override
+  public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {
+    history.add(event);
+  }
+
+  @Override
+  public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {
+    history.add(event);
+  }
+
+  @Override
+  public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {
+    history.add(event);
+  }
+
+  @Override
+  public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {
+    history.add(event);
+  }
+
+  @Override
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {
+    history.add(event);
+  }
+
+  @Override
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {
+    history.add(event);
+  }
+}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
 
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
index 28bb60589..7b0fda93f 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java
@@ -28,6 +28,8 @@ import jakarta.ws.rs.core.Response;
 import jakarta.ws.rs.ext.Provider;
 import java.io.IOException;
 import org.apache.polaris.service.config.PolarisFilterPriorities;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.PolarisEventListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,16 +42,21 @@ public class RateLimiterFilter implements 
ContainerRequestFilter {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RateLimiterFilter.class);
 
   private final RateLimiter rateLimiter;
+  private final PolarisEventListener polarisEventListener;
 
   @Inject
-  public RateLimiterFilter(RateLimiter rateLimiter) {
+  public RateLimiterFilter(RateLimiter rateLimiter, PolarisEventListener 
polarisEventListener) {
     this.rateLimiter = rateLimiter;
+    this.polarisEventListener = polarisEventListener;
   }
 
   /** Returns a 429 if the rate limiter says so. Otherwise, forwards the 
request along. */
   @Override
   public void filter(ContainerRequestContext ctx) throws IOException {
     if (!rateLimiter.canProceed()) {
+      polarisEventListener.onBeforeRequestRateLimited(
+          new BeforeRequestRateLimitedEvent(
+              ctx.getMethod(), ctx.getUriInfo().getAbsolutePath().toString()));
       
ctx.abortWith(Response.status(Response.Status.TOO_MANY_REQUESTS).build());
       LOGGER.atDebug().log("Rate limiting request");
     }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
 
b/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
index 6b42c3fc8..140931031 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
@@ -33,6 +33,9 @@ import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.entity.TaskEntity;
 import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.PolarisEventListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,14 +51,17 @@ public class TaskExecutorImpl implements TaskExecutor {
   private final MetaStoreManagerFactory metaStoreManagerFactory;
   private final TaskFileIOSupplier fileIOSupplier;
   private final List<TaskHandler> taskHandlers = new CopyOnWriteArrayList<>();
+  private final PolarisEventListener polarisEventListener;
 
   public TaskExecutorImpl(
       Executor executor,
       MetaStoreManagerFactory metaStoreManagerFactory,
-      TaskFileIOSupplier fileIOSupplier) {
+      TaskFileIOSupplier fileIOSupplier,
+      PolarisEventListener polarisEventListener) {
     this.executor = executor;
     this.metaStoreManagerFactory = metaStoreManagerFactory;
     this.fileIOSupplier = fileIOSupplier;
+    this.polarisEventListener = polarisEventListener;
   }
 
   public void init() {
@@ -111,45 +117,54 @@ public class TaskExecutorImpl implements TaskExecutor {
   }
 
   protected void handleTask(long taskEntityId, CallContext ctx, int attempt) {
-    // set the call context INSIDE the async task
-    CallContext.setCurrentContext(ctx);
-    LOGGER.info("Handling task entity id {}", taskEntityId);
-    PolarisMetaStoreManager metaStoreManager =
-        
metaStoreManagerFactory.getOrCreateMetaStoreManager(ctx.getRealmContext());
-    PolarisBaseEntity taskEntity =
-        metaStoreManager
-            .loadEntity(ctx.getPolarisCallContext(), 0L, taskEntityId, 
PolarisEntityType.TASK)
-            .getEntity();
-    if (!PolarisEntityType.TASK.equals(taskEntity.getType())) {
-      throw new IllegalArgumentException("Provided taskId must be a task 
entity type");
-    }
-    TaskEntity task = TaskEntity.of(taskEntity);
-    Optional<TaskHandler> handlerOpt =
-        taskHandlers.stream().filter(th -> th.canHandleTask(task)).findFirst();
-    if (handlerOpt.isEmpty()) {
-      LOGGER
-          .atWarn()
-          .addKeyValue("taskEntityId", taskEntityId)
-          .addKeyValue("taskType", task.getTaskType())
-          .log("Unable to find handler for task type");
-      return;
-    }
-    TaskHandler handler = handlerOpt.get();
-    boolean success = handler.handleTask(task, ctx);
-    if (success) {
-      LOGGER
-          .atInfo()
-          .addKeyValue("taskEntityId", taskEntityId)
-          .addKeyValue("handlerClass", handler.getClass())
-          .log("Task successfully handled");
-      metaStoreManager.dropEntityIfExists(
-          ctx.getPolarisCallContext(), null, taskEntity, Map.of(), false);
-    } else {
-      LOGGER
-          .atWarn()
-          .addKeyValue("taskEntityId", taskEntityId)
-          .addKeyValue("taskEntityName", taskEntity.getName())
-          .log("Unable to execute async task");
+    polarisEventListener.onBeforeTaskAttempted(
+        new BeforeTaskAttemptedEvent(taskEntityId, ctx, attempt));
+
+    boolean success = false;
+    try {
+      // set the call context INSIDE the async task
+      CallContext.setCurrentContext(ctx);
+      LOGGER.info("Handling task entity id {}", taskEntityId);
+      PolarisMetaStoreManager metaStoreManager =
+          
metaStoreManagerFactory.getOrCreateMetaStoreManager(ctx.getRealmContext());
+      PolarisBaseEntity taskEntity =
+          metaStoreManager
+              .loadEntity(ctx.getPolarisCallContext(), 0L, taskEntityId, 
PolarisEntityType.TASK)
+              .getEntity();
+      if (!PolarisEntityType.TASK.equals(taskEntity.getType())) {
+        throw new IllegalArgumentException("Provided taskId must be a task 
entity type");
+      }
+      TaskEntity task = TaskEntity.of(taskEntity);
+      Optional<TaskHandler> handlerOpt =
+          taskHandlers.stream().filter(th -> 
th.canHandleTask(task)).findFirst();
+      if (handlerOpt.isEmpty()) {
+        LOGGER
+            .atWarn()
+            .addKeyValue("taskEntityId", taskEntityId)
+            .addKeyValue("taskType", task.getTaskType())
+            .log("Unable to find handler for task type");
+        return;
+      }
+      TaskHandler handler = handlerOpt.get();
+      success = handler.handleTask(task, ctx);
+      if (success) {
+        LOGGER
+            .atInfo()
+            .addKeyValue("taskEntityId", taskEntityId)
+            .addKeyValue("handlerClass", handler.getClass())
+            .log("Task successfully handled");
+        metaStoreManager.dropEntityIfExists(
+            ctx.getPolarisCallContext(), null, taskEntity, Map.of(), false);
+      } else {
+        LOGGER
+            .atWarn()
+            .addKeyValue("taskEntityId", taskEntityId)
+            .addKeyValue("taskEntityName", taskEntity.getName())
+            .log("Unable to execute async task");
+      }
+    } finally {
+      polarisEventListener.onAfterTaskAttempted(
+          new AfterTaskAttemptedEvent(taskEntityId, ctx, attempt, success));
     }
   }
 }
diff --git 
a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
 
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
index 9a070273b..a6854703d 100644
--- 
a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
+++ 
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
@@ -244,7 +244,8 @@ public class FileIOFactoryTest {
             passthroughView,
             services.securityContext(),
             services.taskExecutor(),
-            services.fileIOFactory());
+            services.fileIOFactory(),
+            services.polarisEventListener());
     polarisCatalog.initialize(
         CATALOG_NAME,
         ImmutableMap.of(
diff --git 
a/service/common/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
 
b/service/common/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
new file mode 100644
index 000000000..1cc88fc10
--- /dev/null
+++ 
b/service/common/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.task;
+
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.TaskEntity;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.service.TestServices;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.TestPolarisEventListener;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/** Unit tests for TaskExecutorImpl */
+public class TaskExecutorImplTest {
+  @Test
+  void testEventsAreEmitted() {
+    String realm = "myrealm";
+    RealmContext realmContext = () -> realm;
+
+    TestServices testServices = 
TestServices.builder().realmContext(realmContext).build();
+
+    TestPolarisEventListener testPolarisEventListener =
+        (TestPolarisEventListener) testServices.polarisEventListener();
+
+    MetaStoreManagerFactory metaStoreManagerFactory = 
testServices.metaStoreManagerFactory();
+    PolarisMetaStoreManager metaStoreManager =
+        metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
+    BasePersistence bp = 
metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get();
+
+    PolarisCallContext polarisCallCtx =
+        new PolarisCallContext(bp, testServices.polarisDiagnostics());
+    CallContext callContext = CallContext.of(realmContext, polarisCallCtx);
+
+    // This task doesn't have a type so it won't be handle-able by a real 
handler. We register a
+    // test TaskHandler below that can handle any task.
+    TaskEntity taskEntity =
+        new TaskEntity.Builder()
+            .setName("mytask")
+            
.setId(metaStoreManager.generateNewEntityId(polarisCallCtx).getId())
+            .build();
+    metaStoreManager.createEntityIfNotExists(polarisCallCtx, null, taskEntity);
+
+    int attempt = 1;
+
+    TaskExecutorImpl executor =
+        new TaskExecutorImpl(
+            Runnable::run,
+            testServices.metaStoreManagerFactory(),
+            new TaskFileIOSupplier(testServices.fileIOFactory()),
+            testServices.polarisEventListener());
+    executor.addTaskHandler(
+        new TaskHandler() {
+          @Override
+          public boolean canHandleTask(TaskEntity task) {
+            return true;
+          }
+
+          @Override
+          public boolean handleTask(TaskEntity task, CallContext callContext) {
+            var beforeTaskAttemptedEvent =
+                
testPolarisEventListener.getLatest(BeforeTaskAttemptedEvent.class);
+            Assertions.assertEquals(taskEntity.getId(), 
beforeTaskAttemptedEvent.taskEntityId());
+            Assertions.assertEquals(callContext, 
beforeTaskAttemptedEvent.callContext());
+            Assertions.assertEquals(attempt, 
beforeTaskAttemptedEvent.attempt());
+            return true;
+          }
+        });
+
+    executor.handleTask(taskEntity.getId(), callContext, attempt);
+
+    var afterAttemptTaskEvent = 
testPolarisEventListener.getLatest(AfterTaskAttemptedEvent.class);
+    Assertions.assertEquals(taskEntity.getId(), 
afterAttemptTaskEvent.taskEntityId());
+    Assertions.assertEquals(callContext, afterAttemptTaskEvent.callContext());
+    Assertions.assertEquals(attempt, afterAttemptTaskEvent.attempt());
+    Assertions.assertTrue(afterAttemptTaskEvent.success());
+  }
+}
diff --git 
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
 
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 88406b895..cb48ad0a1 100644
--- 
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++ 
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -56,6 +56,8 @@ import 
org.apache.polaris.service.config.DefaultConfigurationStore;
 import org.apache.polaris.service.config.RealmEntityManagerFactory;
 import org.apache.polaris.service.context.CallContextCatalogFactory;
 import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
+import org.apache.polaris.service.events.PolarisEventListener;
+import org.apache.polaris.service.events.TestPolarisEventListener;
 import 
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
 import org.apache.polaris.service.secrets.UnsafeInMemorySecretsManagerFactory;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
@@ -75,7 +77,8 @@ public record TestServices(
     RealmContext realmContext,
     SecurityContext securityContext,
     FileIOFactory fileIOFactory,
-    TaskExecutor taskExecutor) {
+    TaskExecutor taskExecutor,
+    PolarisEventListener polarisEventListener) {
 
   private static final RealmContext TEST_REALM = () -> "test-realm";
   private static final String GCP_ACCESS_TOKEN = "abc";
@@ -175,13 +178,15 @@ public record TestServices(
 
       TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class);
 
+      PolarisEventListener polarisEventListener = new 
TestPolarisEventListener();
       CallContextCatalogFactory callContextFactory =
           new PolarisCallContextCatalogFactory(
               realmEntityManagerFactory,
               metaStoreManagerFactory,
               userSecretsManagerFactory,
               taskExecutor,
-              fileIOFactory);
+              fileIOFactory,
+              polarisEventListener);
 
       IcebergCatalogAdapter service =
           new IcebergCatalogAdapter(
@@ -252,7 +257,8 @@ public record TestServices(
           realmContext,
           securityContext,
           fileIOFactory,
-          taskExecutor);
+          taskExecutor,
+          polarisEventListener);
     }
   }
 }


Reply via email to